1 package org.apache.maven.surefire.junitcore.pc;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.surefire.report.ConsoleLogger;
23 import org.junit.runner.Description;
24 import org.junit.runners.model.RunnerScheduler;
25
26 import java.io.ByteArrayOutputStream;
27 import java.io.PrintStream;
28 import java.util.Collection;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.CopyOnWriteArraySet;
32 import java.util.concurrent.RejectedExecutionException;
33 import java.util.concurrent.RejectedExecutionHandler;
34 import java.util.concurrent.ThreadPoolExecutor;
35
36
37
38
39
40
41
42
43
44
45
46
47
48 public class Scheduler
49 implements RunnerScheduler
50 {
51 private final Balancer balancer;
52
53 private final SchedulingStrategy strategy;
54
55 private final Set<Controller> slaves = new CopyOnWriteArraySet<Controller>();
56
57 private final Description description;
58
59 private final ConsoleLogger logger;
60
61 private volatile boolean shutdown = false;
62
63 private volatile boolean started = false;
64
65 private volatile boolean finished = false;
66
67 private volatile Controller masterController;
68
69
70
71
72
73
74
75 public Scheduler( ConsoleLogger logger, Description description, SchedulingStrategy strategy )
76 {
77 this( logger, description, strategy, -1 );
78 }
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 public Scheduler( ConsoleLogger logger, Description description, SchedulingStrategy strategy, int concurrency )
94 {
95 this( logger, description, strategy, BalancerFactory.createBalancer( concurrency ) );
96 }
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 public Scheduler( ConsoleLogger logger, Description description, SchedulingStrategy strategy, Balancer balancer )
114 {
115 strategy.setDefaultShutdownHandler( newShutdownHandler() );
116 this.logger = logger;
117 this.description = description;
118 this.strategy = strategy;
119 this.balancer = balancer;
120 masterController = null;
121 }
122
123
124
125
126
127
128
129
130
131
132
133
134 public Scheduler( ConsoleLogger logger, Description description, Scheduler masterScheduler,
135 SchedulingStrategy strategy, Balancer balancer )
136 {
137 this( logger, description, strategy, balancer );
138 strategy.setDefaultShutdownHandler( newShutdownHandler() );
139 masterScheduler.register( this );
140 }
141
142
143
144
145
146
147
148
149 public Scheduler( ConsoleLogger logger, Description description, Scheduler masterScheduler,
150 SchedulingStrategy strategy, int concurrency )
151 {
152 this( logger, description, strategy, concurrency );
153 strategy.setDefaultShutdownHandler( newShutdownHandler() );
154 masterScheduler.register( this );
155 }
156
157
158
159
160
161
162
163 public Scheduler( ConsoleLogger logger, Description description, Scheduler masterScheduler,
164 SchedulingStrategy strategy )
165 {
166 this( logger, description, masterScheduler, strategy, 0 );
167 }
168
169 private void setController( Controller masterController )
170 {
171 if ( masterController == null )
172 {
173 throw new NullPointerException( "null ExecutionController" );
174 }
175 this.masterController = masterController;
176 }
177
178
179
180
181
182 private boolean register( Scheduler slave )
183 {
184 boolean canRegister = slave != null && slave != this;
185 if ( canRegister )
186 {
187 Controller controller = new Controller( slave );
188 canRegister = !slaves.contains( controller );
189 if ( canRegister )
190 {
191 slaves.add( controller );
192 slave.setController( controller );
193 }
194 }
195 return canRegister;
196 }
197
198
199
200
201 private boolean canSchedule()
202 {
203 return !shutdown && ( masterController == null || masterController.canSchedule() );
204 }
205
206 protected void logQuietly( Throwable t )
207 {
208 ByteArrayOutputStream out = new ByteArrayOutputStream();
209 PrintStream stream = new PrintStream( out );
210 try
211 {
212 t.printStackTrace( stream );
213 }
214 finally
215 {
216 stream.close();
217 }
218 logger.info( out.toString() );
219 }
220
221 protected void logQuietly( String msg )
222 {
223 logger.info( msg );
224 }
225
226
227
228
229
230
231
232
233
234
235
236 protected ShutdownResult describeStopped( boolean stopNow )
237 {
238 Collection<Description> executedTests = new ConcurrentLinkedQueue<Description>();
239 Collection<Description> incompleteTests = new ConcurrentLinkedQueue<Description>();
240 stop( executedTests, incompleteTests, false, stopNow );
241 return new ShutdownResult( executedTests, incompleteTests );
242 }
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260 private void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
261 boolean tryCancelFutures, boolean stopNow )
262 {
263 shutdown = true;
264 try
265 {
266 if ( started && !ParallelComputerUtil.isUnusedDescription( description ) )
267 {
268 if ( executedTests != null )
269 {
270 executedTests.add( description );
271 }
272
273 if ( incompleteTests != null && !finished )
274 {
275 incompleteTests.add( description );
276 }
277 }
278
279 for ( Controller slave : slaves )
280 {
281 slave.stop( executedTests, incompleteTests, tryCancelFutures, stopNow );
282 }
283 }
284 finally
285 {
286 try
287 {
288 balancer.releaseAllPermits();
289 }
290 finally
291 {
292 if ( stopNow )
293 {
294 strategy.stopNow();
295 }
296 else if ( tryCancelFutures )
297 {
298 strategy.stop();
299 }
300 else
301 {
302 strategy.disable();
303 }
304 }
305 }
306 }
307
308 protected boolean shutdownThreadPoolsAwaitingKilled()
309 {
310 if ( masterController == null )
311 {
312 stop( null, null, true, false );
313 boolean isNotInterrupted = true;
314 if ( strategy != null )
315 {
316 isNotInterrupted = strategy.destroy();
317 }
318 for ( Controller slave : slaves )
319 {
320 isNotInterrupted &= slave.destroy();
321 }
322 return isNotInterrupted;
323 }
324 else
325 {
326 throw new UnsupportedOperationException( "cannot call this method if this is not a master scheduler" );
327 }
328 }
329
330 protected void beforeExecute()
331 {
332 }
333
334 protected void afterExecute()
335 {
336 }
337
338 public void schedule( Runnable childStatement )
339 {
340 if ( childStatement == null )
341 {
342 logQuietly( "cannot schedule null" );
343 }
344 else if ( canSchedule() && strategy.canSchedule() )
345 {
346 try
347 {
348 boolean isNotInterrupted = balancer.acquirePermit();
349 if ( isNotInterrupted && !shutdown )
350 {
351 Runnable task = wrapTask( childStatement );
352 strategy.schedule( task );
353 started = true;
354 }
355 }
356 catch ( RejectedExecutionException e )
357 {
358 stop( null, null, true, false );
359 }
360 catch ( Throwable t )
361 {
362 balancer.releasePermit();
363 logQuietly( t );
364 }
365 }
366 }
367
368 public void finished()
369 {
370 try
371 {
372 strategy.finished();
373 }
374 catch ( InterruptedException e )
375 {
376 logQuietly( e );
377 }
378 finally
379 {
380 finished = true;
381 }
382 }
383
384 private Runnable wrapTask( final Runnable task )
385 {
386 return new Runnable()
387 {
388 public void run()
389 {
390 try
391 {
392 beforeExecute();
393 task.run();
394 }
395 finally
396 {
397 try
398 {
399 afterExecute();
400 }
401 finally
402 {
403 balancer.releasePermit();
404 }
405 }
406 }
407 };
408 }
409
410 protected ShutdownHandler newShutdownHandler()
411 {
412 return new ShutdownHandler();
413 }
414
415
416
417
418 private final class Controller
419 {
420 private final Scheduler slave;
421
422 private Controller( Scheduler slave )
423 {
424 this.slave = slave;
425 }
426
427
428
429
430 boolean canSchedule()
431 {
432 return Scheduler.this.canSchedule();
433 }
434
435 void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
436 boolean tryCancelFutures, boolean shutdownNow )
437 {
438 slave.stop( executedTests, incompleteTests, tryCancelFutures, shutdownNow );
439 }
440
441
442
443
444 boolean destroy()
445 {
446 return slave.strategy.destroy();
447 }
448
449 @Override
450 public int hashCode()
451 {
452 return slave.hashCode();
453 }
454
455 @Override
456 public boolean equals( Object o )
457 {
458 return o == this || ( o instanceof Controller ) && slave.equals( ( (Controller) o ).slave );
459 }
460 }
461
462
463
464
465
466
467
468
469
470 public class ShutdownHandler
471 implements RejectedExecutionHandler
472 {
473 private volatile RejectedExecutionHandler poolHandler;
474
475 protected ShutdownHandler()
476 {
477 poolHandler = null;
478 }
479
480 public void setRejectedExecutionHandler( RejectedExecutionHandler poolHandler )
481 {
482 this.poolHandler = poolHandler;
483 }
484
485 public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
486 {
487 if ( executor.isShutdown() )
488 {
489 Scheduler.this.stop( null, null, true, false );
490 }
491 final RejectedExecutionHandler poolHandler = this.poolHandler;
492 if ( poolHandler != null )
493 {
494 poolHandler.rejectedExecution( r, executor );
495 }
496 }
497 }
498 }